package org.snake.nebulae.core.service;

import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;

import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ScheduledFuture;

@Slf4j
public class TaskExecutorManager {

    private final ThreadPoolTaskScheduler timerTaskPool;

    /**
     * 每个任务所对应的线程池
     * 任务 -> 线程池
     */
    private final ConcurrentHashMap<Class<?>, ThreadPoolTaskScheduler> pool = new ConcurrentHashMap<>();

    /**
     * 每个任务所对应执行完的任务
     * 任务 -> 完成线程池
     */
    private final ConcurrentHashMap<Class<?>, CopyOnWriteArrayList<TaskWrap>> finishedTasks = new ConcurrentHashMap<>();

    public TaskExecutorManager() {
        this.timerTaskPool = TaskExecutorFactory.createPool(this);
    }

    /**
     * 为任务执行管理器添加一个后台定时执行的任务
     *
     * @param task  任务
     * @param delay 每次延迟多久
     * @return 任务结果
     */
    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable task, long delay) {
        return this.timerTaskPool.scheduleWithFixedDelay(task, delay);
    }

    public void submit(TaskWrap taskWrap) {
        if (!pool.containsKey(taskWrap.getKlass())) {
            // 初始化 各自队列
            pool.put(taskWrap.getKlass(), TaskExecutorFactory.createPool(taskWrap));
            finishedTasks.put(taskWrap.getKlass(), new CopyOnWriteArrayList<>());
        }
        putTask(taskWrap, pool);
    }

    private void putTask(TaskWrap taskWrap, ConcurrentHashMap<Class<?>, ThreadPoolTaskScheduler> threadPool) {
        if (Objects.isNull(taskWrap.getCallable())) {
            // 设置任务开始执行时间
            taskWrap.setTaskExecutorStartTime(System.currentTimeMillis());

            ListenableFuture<?> listenableFuture = threadPool.get(taskWrap.getKlass()).submitListenable(taskWrap.getRunnable());

            listenableFuture.addCallback(new ListenableFutureCallback<Object>() {
                @Override
                public void onSuccess(Object result) {
                    // 设置任务执行结束时间与计算
                    taskWrap.setTaskExecutorEndTime(System.currentTimeMillis());
                    taskWrap.recordTime();

                    // 移动完成任务到队列
                    finishedTasks.get(taskWrap.getKlass()).add(taskWrap);
                }

                @Override
                public void onFailure(Throwable ex) {
                    log.error(ex.getMessage());
                }
            });
        } else {
            threadPool.get(taskWrap.getKlass()).submit(taskWrap.getCallable());
        }
    }

    /**
     * 获取结束任务map
     *
     * @return map
     */
    public ConcurrentHashMap<Class<?>, CopyOnWriteArrayList<TaskWrap>> getFinishedTasks() {
        return finishedTasks;
    }
}
